package com.clp.protocol.iec104.server.pipeline.state;

import com.clp.protocol.iec104.apdu.*;
import com.clp.protocol.iec104.apdu.apci.IApci;
import com.clp.protocol.iec104.apdu.apci.ICtrlArea;
import com.clp.protocol.iec104.apdu.apci.SApci;
import com.clp.protocol.iec104.apdu.apci.SCtrlArea;
import com.clp.protocol.iec104.apdu.asdu.IAsdu;
import com.clp.protocol.iec104.definition.ConstVal;
import com.clp.protocol.iec104.common.IncorrectRecvSeqException;
import com.clp.protocol.iec104.common.IncorrectSendSeqException;
import com.clp.protocol.iec104.common.IncorrectSeqException;
import com.clp.protocol.iec104.server.SlaveControlConfig;
import com.clp.protocol.iec104.server.pipeline.PipelineManager;
import com.clp.protocol.iec104.server.pipeline.AsduSendEntrance;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.Queue;

/**
 * 关口处理器：序号控制；将APDU解封装为ASDU；kw流量控制；报文过滤；...
 */
@Slf4j
public class GateStateHandler extends AbstractStateHandler implements AsduSendEntrance {
    @Getter
    private final int k;
    @Getter
    private final int w;

    private int currRecvCount; // 当前累计接收但未确认I帧数
    private int currSendCount; // 当前累计发送但未被确认I帧数
    private Queue<PromisedIAsduData> queue = new ArrayDeque<>(32); // 等待发送的队列
    private Queue<PromisedIAsduData> tempQueue = new ArrayDeque<>(32); // 临时队列

    @Getter
    private volatile int sendSeq; // 发送序号，最大值32768-1
    @Getter
    private volatile int recvSeq; // 接收序号，最大值32768-1
    private volatile int ackSeq; // 发送出去的已被接收到的序号

    public GateStateHandler(PipelineManager manager, SlaveControlConfig cfg) {
        super(manager);
        this.k = cfg.getK();
        this.w = cfg.getW();
    }

    @Override
    protected void resetState() {
        this.currSendCount = 0;
        this.currRecvCount = 0;
        this.sendSeq = 0;
        this.recvSeq = 0;
        this.ackSeq = 0;
    }

    @Override
    protected void afterResetState() {

    }

    /**
     * 会将 Apdu 解封装为 Asdu ，并且应用过滤条件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg == null) return;
        if (!(msg instanceof Apdu)) {
            ctx.fireChannelRead(msg);
            return;
        }

        Apdu apdu = (Apdu) msg;
        IAsdu iAsdu = null;
        try {
            if (apdu.isUType()) {
                // 是U帧
            } else if (apdu.isSType()) {
                // 检查序号
                SApci sApci = apdu.castToSType().getSApci();
                SCtrlArea sControlArea = sApci.getCtrlArea();
                int remoteRecvSeq = sControlArea.getRecvSeq();
                if (remoteRecvSeq > sendSeq) {
                    throw new IncorrectRecvSeqException(remoteRecvSeq);
                }

                // 更新确认序号
                ackSeq = remoteRecvSeq;

                // 尝试处理等待发送队列
                currSendCount = 0;
                resendWaitingIAsdu();
            } else if (apdu.isIType()) {
                IApdu iApdu = apdu.castToIType();
                IApci iApci = iApdu.getIApci();
                ICtrlArea iControlArea = iApci.getCtrlArea();
                iAsdu = iApdu.getIAsdu();

                // 检查序号
                int remoteSendSeq = iControlArea.getSendSeq();
                int remoteRecvSeq = iControlArea.getRecvSeq();
                if (remoteSendSeq != recvSeq) {
                    throw new IncorrectSendSeqException(remoteSendSeq);
                }
                // 处理接收序号
                recvSeq = (remoteSendSeq + 1) % ConstVal.MAX_SEQ;
                // 处理确认序号
                ackSeq = remoteRecvSeq;

                // 处理累计接收
                currRecvCount = (currRecvCount + 1) % w;
                // 一定要返回确认帧的情况（重置计数）
                boolean needResetCurrRecvCount = false;
                switch (iAsdu.getTypeTag()) {
                    case C_IC_NA_1: // 总召唤
                        switch (iAsdu.getCot().getCause()) {
                            case COT_ACTTERM: // 激活终止
                                needResetCurrRecvCount = true;
                        }
                        break;
                }
                if (needResetCurrRecvCount) currRecvCount = 0;
                // 如果当前累计接收到达最大值，发送一个确认帧
                if (currRecvCount == 0) {
                    // 发送链路测试确认
                    sApduSender().chSend(recvSeq);
                    log.info("[Gate] reach w={}, send ack", w);
                }

                // 尝试处理等待发送队列
                currSendCount = 0;
                resendWaitingIAsdu();
            }

            if (iAsdu == null) return;
            if (!controlInfo().isInitCompleted()) {
                log.warn("[Gate] 初始化未结束，丢弃Asdu：{}", iAsdu);
                return;
            }
            if (!controlInfo().isStartedDt()) {
                log.warn("[Gate] 未启动传输，丢弃Asdu：{}", iAsdu);
                return;
            }
            ctx.fireChannelRead(iAsdu);
        } catch (IncorrectSeqException e) {
            log.warn("[Gate] I帧失序，原因：{}，即将断开连接", e.getMessage());
            channel.close();
        }
    }

    private void resendWaitingIAsdu() {
        Queue<PromisedIAsduData> q = queue;
        queue = tempQueue;
        tempQueue = q;

        PromisedIAsduData data;
        while ((data = tempQueue.poll()) != null) {
            log.info("[Gate] 从缓冲区中获取Asdu发送");
            tryChannelSend(data);
        }
    }

    private void tryChannelSend(PromisedIAsduData data) {
        if (currSendCount < k) {
            // 发送窗口未满，直接发送
            currSendCount++;
            ctx.channel().writeAndFlush(data.iAsdu, data.promise);
            return;
        }
        // 如果发送窗口已满，存入等待队列
        log.info("[Gate] 发送Asdu数达到k值，新Asdu存入缓冲区");
        boolean isPushed = queue.offer(data);
        if (!isPushed) {
            String msg = "等待队列已满，发送失败！";
            log.warn("[Gate] " + msg);
            data.promise.setFailure(new IllegalStateException(msg));
        }
    }

    @Override
    public void tryChannelSend(IAsdu iAsdu, ChannelPromise promise) {
        tryChannelSend(new PromisedIAsduData(iAsdu, promise));
    }

    /**
     * 会允许所有的Asdu发送，不会进行拦截操作
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception
     */
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg == null) return;
        if (msg instanceof Apdu) {
            ctx.write(msg, promise);
            return;
        }
        if (msg instanceof IAsdu) {
            // 包装成 apdu
            Apdu apdu = ApduFactory.getIApdu(sendSeq, recvSeq, (IAsdu) msg);
            // 更新发送序号
            sendSeq = (sendSeq + 1) % ConstVal.MAX_SEQ;
            ctx.write(apdu, promise);
            return;
        }
        ctx.write(msg, promise);
    }

    @Nullable
    @Override
    protected ScheduledTask getScheduledTask() {
        return null;
    }

    private static class PromisedIAsduData {
        private final IAsdu iAsdu;
        private final ChannelPromise promise;

        public PromisedIAsduData(IAsdu iAsdu, ChannelPromise promise) {
            this.iAsdu = iAsdu;
            this.promise = promise;
        }
    }
}
